Request-Response vs Event-based
NestJS 微服务支持两种消息模式:
| 模式 | 装饰器 | 特点 | 适用场景 |
|---|---|---|---|
| Request-Response | @MessagePattern() | 请求-响应模式,发送方等待回复 | 鉴权、数据查询等需要明确返回结果的场景 |
| Event-based | @EventPattern() | 广播式通信,发送方不等待回复 | 日志记录、通知推送、统计任务等不需要即时响应的场景 |
Event-based 模式特别适合以下情况:
- 流式消息处理 -- 消息频繁发送,不需要逐条等待响应
- 广播通知 -- 将消息广播出去,不关心谁接收、什么时候处理
- 异步任务 -- 如查询订单是否完成、用户统计数据生成等不紧急的任务
服务端:接收事件消息
使用 @EventPattern() 装饰器来监听事件消息。与 @MessagePattern() 不同,事件处理函数不需要返回值:
import { Controller } from '@nestjs/common';
import { EventPattern, Payload } from '@nestjs/microservices';
@Controller()
export class NotificationController {
// 监听用户创建事件
@EventPattern('user_created')
async handleUserCreated(@Payload() data: { userId: number; email: string }) {
console.log(`新用户注册: ${data.email}`);
// 发送欢迎邮件、初始化用户数据等
await this.notificationService.sendWelcomeEmail(data.email);
}
// 监听订单完成事件
@EventPattern('order_completed')
async handleOrderCompleted(@Payload() data: { orderId: string; amount: number }) {
console.log(`订单完成: ${data.orderId}, 金额: ${data.amount}`);
// 更新统计数据、发送短信通知等
await this.statsService.updateRevenue(data.amount);
}
// 多个 handler 可以监听同一个 pattern
@EventPattern('user_created')
async handleUserCreatedLog(@Payload() data: { userId: number; email: string }) {
// 记录审计日志
await this.auditLog.record('user_created', data);
}
}
typescript
关键特性:可以为同一个 pattern 注册多个事件处理器,当消息到达时,所有匹配的处理器都会被同步触发。
客户端:发布事件消息
使用 ClientProxy.emit() 方法发布事件消息。与 send() 不同,emit() 返回的是一个热 Observable,它会立即尝试发送消息,无论是否有订阅者。
import { Injectable, Inject } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
@Injectable()
export class UserService {
constructor(
@Inject('NOTIFICATION_SERVICE') private client: ClientProxy,
) {}
async createUser(dto: CreateUserDto) {
const user = await this.userRepository.save(dto);
// 广播用户创建事件,不需要等待响应
this.client.emit('user_created', {
userId: user.id,
email: user.email,
});
return user;
}
}
typescript
send() vs emit() 对比
| 维度 | send() | emit() |
|---|---|---|
| 模式 | Request-Response | Event-based |
| 返回值 | 冷 Observable(需要订阅才会发送) | 热 Observable(立即发送) |
| 是否等待响应 | 是 | 否 |
| 装饰器匹配 | @MessagePattern() | @EventPattern() |
| 使用场景 | 需要返回数据的查询和操作 | 通知、日志、异步任务 |
实际应用场景示例
用户注册流程中的事件广播:
1. UserService.createUser() 创建用户
2. emit('user_created', { userId, email })
3. ┌─ NotificationService 收到事件 → 发送欢迎邮件
├─ StatsService 收到事件 → 更新注册统计
├─ AuditLogService 收到事件 → 记录审计日志
└─ ProfileService 收到事件 → 初始化用户资料
text
这种广播式的事件驱动架构让各个服务只需要关注自己感兴趣的事件,实现了服务间的松耦合。
事件处理中的错误处理
事件处理器中的异常不会被发送回客户端(因为 emit 不等待响应),因此需要在处理器内部做好错误处理:
@EventPattern('user_created')
async handleUserCreated(@Payload() data: { userId: number; email: string }) {
try {
await this.notificationService.sendWelcomeEmail(data.email);
} catch (error) {
// 记录错误日志,不会影响其他事件处理器
this.logger.error(`发送欢迎邮件失败: ${error.message}`, error.stack);
// 可以选择将失败的任务加入重试队列
}
}
typescript
↑